智能维护概述#
Claude Code 可以自动化维护任务、监控系统健康状态、预测潜在问题,并提供维护优化建议。通过智能维护功能,开发者可以降低维护成本、提高系统可靠性。
自动化维护#
1. 依赖管理#
示例:自动化依赖管理
用户请求: "为项目实现自动化依赖管理"
Claude Code 生成的代码:
requirements.txt#
bashFlask-SQLAlchemy==3.0.5 Flask-JWT-Extended==4.5.2 Flask-Mail==0.9.1 Flask-Migrate==4.0.5 gunicorn==21.2.0 psycopg2-binary==2.9.7 redis==4.6.0 celery==5.3.4 python-dotenv==1.0.0 ``` ## requirements-dev.txt ``` pytest==7.4.2 pytest-cov==4.1.0 pytest-mock==3.11.1 black==23.9.1 flake8==6.1.0 mypy==1.5.1 isort==5.12.0 pre-commit==3.4.0 ## 依赖更新脚本 ````python `python #!/usr/bin/env python3 """ 依赖更新脚本 自动检查并更新过期的依赖包 """ import subprocess import json from typing import List, Dict import sys def get_outdated_packages() -> List[Dict]: """获取过期的包""" result = subprocess.run( ['pip', 'list', '--outdated', '--format=json'], capture_output=True, text=True ) if result.returncode != 0: print("Error: Failed to get outdated packages") return [] packages = json.loads(result.stdout) return packages def check_compatibility(package: str, version: str) -> bool: """检查包的兼容性""" try: result = subprocess.run( ['pip', 'install', '--dry-run', f'{package}=={version}'], capture_output=True, text=True ) return result.returncode == 0 except Exception as e: print(f"Error checking compatibility for {package}: {e}") return False def update_package(package: str, version: str) -> bool: """更新包""" try: result = subprocess.run( ['pip', 'install', '--upgrade', f'{package}=={version}'], capture_output=True, text=True ) return result.returncode == 0 except Exception as e: print(f"Error updating {package}: {e}") return False def update_requirements(): """更新 requirements.txt""" try: result = subprocess.run( ['pip', 'freeze'], capture_output=True, text=True ) with open('requirements.txt', 'w') as f: f.write(result.stdout) return True except Exception as e: print(f"Error updating requirements.txt: {e}") return False def main(): """主函数""" print("检查过期依赖...") outdated_packages = get_outdated_packages() if not outdated_packages: print("所有依赖都是最新的!") return print(f"\n发现 {len(outdated_packages)} 个过期包:") for pkg in outdated_packages: print(f" - {pkg['name']}: {pkg['version']} -> {pkg['latest_version']}") print("\n检查兼容性...") compatible_packages = [] for pkg in outdated_packages: package_name = pkg['name'] latest_version = pkg['latest_version'] if check_compatibility(package_name, latest_version): compatible_packages.append(pkg) print(f" ✓ {package_name} {latest_version}") else: print(f" ✗ {package_name} {latest_version} (不兼容)") if not compatible_packages: print("\n没有可更新的包") return print(f"\n准备更新 {len(compatible_packages)} 个包...") response = input("是否继续?(y/n): ") if response.lower() != 'y': print("取消更新") return print("\n开始更新...") for pkg in compatible_packages: package_name = pkg['name'] latest_version = pkg['latest_version'] print(f"更新 {package_name} 到 {latest_version}...") if update_package(package_name, latest_version): print(f" ✓ {package_name} 更新成功") else: print(f" ✗ {package_name} 更新失败") print("\n更新 requirements.txt...") if update_requirements(): print(" ✓ requirements.txt 更新成功") else: print(" ✗ requirements.txt 更新失败") print("\n依赖更新完成!") if __name__ == '__main__': main() ```## 依赖安全检查 ``` python #!/usr/bin/env python3 """ 依赖安全检查脚本 检查依赖包的安全漏洞 """ import subprocess import json from typing import List, Dict def check_security() -> List[Dict]: """检查依赖安全性""" try: result = subprocess.run( ['pip', 'audit'], capture_output=True, text=True ) if result.returncode != 0: print("Error: Failed to check security") return [] return json.loads(result.stdout) except Exception as e: print(f"Error checking security: {e}") return [] def main(): """主函数""" print("检查依赖安全性...") vulnerabilities = check_security() if not vulnerabilities: print("未发现安全漏洞!") return print(f"\n发现 {len(vulnerabilities)} 个安全漏洞:") for vuln in vulnerabilities: print(f"\n 包: {vuln['name']}") print(f" 版本: {vuln['version']}") print(f" 漏洞: {vuln['advisory']}") print(f" 严重性: {vuln['severity']}") print(f" 修复版本: {vuln['fix_versions']}") if __name__ == '__main__': main() ### 2. 数据库维护 ``` python # 示例:自动化数据库维护 用户请求: "为应用实现自动化数据库维护" Claude Code 生成的代码: ```python ```python from datetime import datetime, timedelta from sqlalchemy import text from app import db import logging logger = logging.getLogger(__name__) class DatabaseMaintenance: """数据库维护类""" def __init__(self, db_session): self.db = db_session def analyze_tables(self): """分析表""" tables = ['users', 'products', 'orders', 'payments'] for table in tables: try: result = self.db.execute(text(f"ANALYZE {table}")) logger.info(f"Table {table} analyzed successfully") except Exception as e: logger.error(f"Error analyzing table {table}: {e}") def vacuum_tables(self): """清理表""" tables = ['users', 'products', 'orders', 'payments'] for table in tables: try: result = self.db.execute(text(f"VACUUM ANALYZE {table}")) logger.info(f"Table {table} vacuumed successfully") except Exception as e: logger.error(f"Error vacuuming table {table}: {e}") def rebuild_indexes(self): """重建索引""" indexes = [ 'idx_users_username', 'idx_users_email', 'idx_orders_user_id', 'idx_orders_status', 'idx_products_name' ] for index in indexes: try: result = self.db.execute(text(f"REINDEX INDEX {index}")) logger.info(f"Index {index} rebuilt successfully") except Exception as e: logger.error(f"Error rebuilding index {index}: {e}") def clean_old_logs(self, days=30): """清理旧日志""" cutoff_date = datetime.utcnow() - timedelta(days=days) try: result = self.db.execute( text(""" DELETE FROM logs WHERE created_at < :cutoff_date """), {'cutoff_date': cutoff_date} ) deleted_count = result.rowcount logger.info(f"Deleted {deleted_count} old log entries") self.db.commit() return deleted_count except Exception as e: logger.error(f"Error cleaning old logs: {e}") self.db.rollback() return 0 def clean_old_sessions(self, days=7): """清理旧会话""" cutoff_date = datetime.utcnow() - timedelta(days=days) try: result = self.db.execute( text(""" DELETE FROM sessions WHERE expires_at < :cutoff_date """), {'cutoff_date': cutoff_date} ) deleted_count = result.rowcount logger.info(f"Deleted {deleted_count} old sessions") self.db.commit() return deleted_count except Exception as e: logger.error(f"Error cleaning old sessions: {e}") self.db.rollback() return 0 def optimize_database(self): """优化数据库""" try: self.analyze_tables() self.vacuum_tables() self.rebuild_indexes() logger.info("Database optimization completed successfully") except Exception as e: logger.error(f"Error optimizing database: {e}") def get_database_stats(self): """获取数据库统计信息""" stats = {} try: # 表大小 result = self.db.execute(text(""" SELECT schemaname, tablename, pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS size FROM pg_tables WHERE schemaname = 'public' ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC """)) stats['table_sizes'] = [ { 'schema': row[0], 'table': row[1], 'size': row[2] } for row in result ] # 索引使用情况 result = self.db.execute(text(""" SELECT schemaname, tablename, indexname, idx_scan, idx_tup_read, idx_tup_fetch FROM pg_stat_user_indexes ORDER BY idx_scan DESC """)) stats['index_usage'] = [ { 'schema': row[0], 'table': row[1], 'index': row[2], 'scans': row[3], 'tuples_read': row[4], 'tuples_fetched': row[5] } for row in result ] # 慢查询 result = self.db.execute(text(""" SELECT query, calls, total_time, mean_time, max_time FROM pg_stat_statements ORDER BY mean_time DESC LIMIT 10 """)) stats['slow_queries'] = [ { 'query': row[0], 'calls': row[1], 'total_time': row[2], 'mean_time': row[3], 'max_time': row[4] } for row in result ] return stats except Exception as e: logger.error(f"Error getting database stats: {e}") return stats def run_maintenance(): """运行维护任务""" maintenance = DatabaseMaintenance(db.session) print("开始数据库维护...") # 优化数据库 print("优化数据库...") maintenance.optimize_database() # 清理旧数据 print("清理旧日志...") maintenance.clean_old_logs(days=30) print("清理旧会话...") maintenance.clean_old_sessions(days=7) # 获取统计信息 print("获取数据库统计信息...") stats = maintenance.get_database_stats() print("\n数据库统计信息:") print(f" 表数量: {len(stats['table_sizes'])}") print(f" 索引数量: {len(stats['index_usage'])}") print(f" 慢查询数量: {len(stats['slow_queries'])}") print("\n数据库维护完成!") if __name__ == '__main__': run_maintenance() ## 系统监控 ### 1. 性能监控 ``` python # 示例:实现性能监控 用户请求: "为应用实现性能监控" Claude Code 生成的代码: ```python ```python import time import psutil from functools import wraps from flask import request, g from prometheus_client import Counter, Histogram, Gauge, generate_latest import logging logger = logging.getLogger(__name__) # Prometheus 指标 request_count = Counter( 'http_requests_total', 'Total HTTP requests', ['method', 'endpoint', 'status'] ) request_duration = Histogram( 'http_request_duration_seconds', 'HTTP request duration', ['method', 'endpoint'] ) active_connections = Gauge( 'active_connections', 'Number of active connections' ) memory_usage = Gauge( 'memory_usage_bytes', 'Memory usage in bytes' ) cpu_usage = Gauge( 'cpu_usage_percent', 'CPU usage percentage' ) disk_usage = Gauge( 'disk_usage_percent', 'Disk usage percentage' ) def track_request_metrics(f): """跟踪请求指标""" @wraps(f) def decorated_function(*args, **kwargs): start_time = time.time() # 记录请求开始 g.start_time = start_time try: response = f(*args, **kwargs) status_code = response.status_code if hasattr(response, 'status_code') else 200 # 记录请求计数 request_count.labels( method=request.method, endpoint=request.endpoint or 'unknown', status=status_code ).inc() # 记录请求持续时间 duration = time.time() - start_time request_duration.labels( method=request.method, endpoint=request.endpoint or 'unknown' ).observe(duration) return response except Exception as e: # 记录错误 request_count.labels( method=request.method, endpoint=request.endpoint or 'unknown', status=500 ).inc() raise return decorated_function def update_system_metrics(): """更新系统指标""" # 内存使用 memory = psutil.virtual_memory() memory_usage.set(memory.used) # CPU 使用 cpu_usage.set(psutil.cpu_percent()) # 磁盘使用 disk = psutil.disk_usage('/') disk_usage.set(disk.percent) # 活跃连接 active_connections.set(len(psutil.net_connections())) class PerformanceMonitor: """性能监控类""" def __init__(self, app): self.app = app self.metrics = {} def track_function(self, name): """跟踪函数性能""" def decorator(f): @wraps(f) def decorated_function(*args, **kwargs):
bashstart_time = time.time() try: result = f(*args, **kwargs) duration = time.time() - start_time self.record_metric(name, duration, success=True) return result except Exception as e: duration = time.time() - start_time self.record_metric(name, duration, success=False) logger.error(f"Error in {name}: {e}") raise return decorated_function return decorator def record_metric(self, name, duration, success=True): """记录指标""" if name not in self.metrics: self.metrics[name] = { 'count': 0, 'total_duration': 0, 'success_count': 0, 'error_count': 0, 'min_duration': float('inf'), 'max_duration': 0 } metric = self.metrics[name] metric['count'] += 1 metric['total_duration'] += duration if success: metric['success_count'] += 1 else: metric['error_count'] += 1 metric['min_duration'] = min(metric['min_duration'], duration) metric['max_duration'] = max(metric['max_duration'], duration) def get_metrics(self): """获取指标""" for name, metric in self.metrics.items(): if metric['count'] > 0: metric['avg_duration'] = metric['total_duration'] / metric['count'] metric['success_rate'] = metric['success_count'] / metric['count'] return self.metrics def get_slow_functions(self, threshold=1.0): """获取慢函数""" slow_functions = [] for name, metric in self.metrics.items(): if metric['count'] > 0: avg_duration = metric['total_duration'] / metric['count'] if avg_duration > threshold: slow_functions.append({ 'name': name, 'avg_duration': avg_duration, 'count': metric['count'], 'max_duration': metric['max_duration'] }) return sorted(slow_functions, key=lambda x: x['avg_duration'], reverse=True)
def setup_monitoring(app): """设置监控"""
bash@app.route('/metrics') def metrics(): """Prometheus 指标端点""" return generate_latest() @app.before_request def before_request(): """请求前处理""" g.start_time = time.time() active_connections.inc() @app.after_request def after_request(response): """请求后处理""" if hasattr(g, 'start_time'): duration = time.time() - g.start_time request_count.labels( method=request.method, endpoint=request.endpoint or 'unknown', status=response.status_code ).inc() request_duration.labels( method=request.method, endpoint=request.endpoint or 'unknown' ).observe(duration) active_connections.dec() return response # 定期更新系统指标 def update_metrics(): while True: update_system_metrics() time.sleep(5) import threading thread = threading.Thread(target=update_metrics, daemon=True) thread.start() return app
2. 错误监控#
python````python # 示例:实现错误监控 用户请求: "为应用实现错误监控" Claude Code 生成的代码: ```python ```python import logging import traceback from datetime import datetime from typing import Dict, List from flask import request, g import sentry_sdk from sentry_sdk.integrations.flask import FlaskIntegration logger = logging.getLogger(__name__) class ErrorMonitor: """错误监控类""" def __init__(self, app): self.app = app self.errors = [] self.error_stats = {} def capture_exception(self, exception, context=None): """捕获异常""" error_data = { 'type': type(exception).__name__, 'message': str(exception), 'traceback': traceback.format_exc(), 'timestamp': datetime.utcnow().isoformat(), 'context': context or {} } # 添加请求信息 if request: error_data['request'] = { 'method': request.method, 'path': request.path, 'url': request.url, 'ip': request.remote_addr, 'user_agent': request.user_agent.string } # 添加用户信息 if hasattr(g, 'user_id'): error_data['user_id'] = g.user_id self.errors.append(error_data) # 更新统计 error_type = error_data['type'] if error_type not in self.error_stats: self.error_stats[error_type] = { 'count': 0, 'last_occurrence': None } self.error_stats[error_type]['count'] += 1 self.error_stats[error_type]['last_occurrence'] = error_data['timestamp'] # 记录日志 logger.error( f"Exception captured: {error_type}", extra=error_data ) # 发送到 Sentry sentry_sdk.capture_exception(exception) def get_errors(self, limit=100): """获取错误列表""" return self.errors[-limit:] def get_error_stats(self): """获取错误统计""" return self.error_stats def get_frequent_errors(self, threshold=10): """获取频繁错误""" frequent_errors = [] for error_type, stats in self.error_stats.items(): if stats['count'] >= threshold: frequent_errors.append({ 'type': error_type, 'count': stats['count'], 'last_occurrence': stats['last_occurrence'] }) return sorted(frequent_errors, key=lambda x: x['count'], reverse=True) def clear_errors(self): """清除错误""" self.errors = [] self.error_stats = {} def setup_error_monitoring(app, sentry_dsn): """设置错误监控""" # 初始化 Sentry sentry_sdk.init( dsn=sentry_dsn, integrations=[FlaskIntegration()], traces_sample_rate=1.0, profiles_sample_rate=1.0 ) error_monitor = ErrorMonitor(app) @app.errorhandler(Exception) def handle_exception(e): """处理异常""" error_monitor.capture_exception(e) if request.is_json: return {'error': str(e)}, 500 else: return str(e), 500 @app.errorhandler(404) def handle_not_found(e): """处理 404""" logger.warning(f"404 Not Found: {request.path}") return {'error': 'Not found'}, 404 @app.errorhandler(500) def handle_server_error(e): """处理 500""" error_monitor.capture_exception(e) return {'error': 'Internal server error'}, 500 @app.route('/admin/errors') def get_errors(): """获取错误列表""" errors = error_monitor.get_errors() return {'errors': errors} @app.route('/admin/errors/stats') def get_error_stats(): """获取错误统计""" stats = error_monitor.get_error_stats() return {'stats': stats} @app.route('/admin/errors/frequent') def get_frequent_errors(): """获取频繁错误""" frequent_errors = error_monitor.get_frequent_errors() return {'frequent_errors': frequent_errors} return app ```## 预测性维护 ### 1. 容量预测 # 示例:实现容量预测 用户请求: "为应用实现容量预测" Claude Code 生成的代码: ````python `python import numpy as np from datetime import datetime, timedelta from typing import List, Dict import logging logger = logging.getLogger(__name__) class CapacityPredictor: """容量预测器""" def __init__(self): self.history = [] self.predictions = {} def add_metric(self, timestamp: datetime, metric_name: str, value: float): """添加指标""" self.history.append({ 'timestamp': timestamp, 'metric': metric_name, 'value': value }) def predict_capacity(self, metric_name: str, days: int = 7) -> List[Dict]: """预测容量""" # 获取历史数据 data = [ entry for entry in self.history if entry['metric'] == metric_name ] if len(data) < 30: logger.warning(f"Insufficient data for {metric_name}") return [] # 提取值 values = [entry['value'] for entry in data] # 计算趋势 trend = self._calculate_trend(values) # 预测未来值 predictions = [] for i in range(days): predicted_value = values[-1] + trend * (i + 1) predicted_date = datetime.utcnow() + timedelta(days=i + 1) predictions.append({ 'date': predicted_date.isoformat(), 'value': predicted_value, 'metric': metric_name }) self.predictions[metric_name] = predictions return predictions def _calculate_trend(self, values: List[float]) -> float: """计算趋势""" if len(values) < 2: return 0 # 使用线性回归 x = np.arange(len(values)) y = np.array(values) # 计算斜率 slope = np.polyfit(x, y, 1)[0] return slope def check_capacity_alerts(self, threshold: float = 0.9) -> List[Dict]: """检查容量告警""" alerts = [] for metric_name, predictions in self.predictions.items(): for prediction in predictions: if prediction['value'] >= threshold: alerts.append({ 'metric': metric_name, 'date': prediction['date'], 'value': prediction['value'], 'threshold': threshold }) return sorted(alerts, key=lambda x: x['value'], reverse=True) def get_capacity_recommendations(self) -> List[Dict]: """获取容量建议""" recommendations = [] alerts = self.check_capacity_alerts() if alerts: recommendations.append({ 'type': 'scale_up', 'message': f"发现 {len(alerts)} 个容量告警,建议扩容", 'alerts': alerts }) # 检查资源利用率 for metric_name, predictions in self.predictions.items(): avg_value = np.mean([p['value'] for p in predictions]) if avg_value < 0.3: recommendations.append({ 'type': 'scale_down', 'message': f"{metric_name} 平均利用率较低,建议缩容", 'metric': metric_name, 'avg_value': avg_value }) return recommendations def run_capacity_prediction(): """运行容量预测""" predictor = CapacityPredictor() # 添加历史数据(示例) now = datetime.utcnow() for i in range(30): timestamp = now - timedelta(days=30 - i) value = 0.5 + (i / 100) + np.random.normal(0, 0.05) predictor.add_metric(timestamp, 'cpu_usage', value) # 预测容量 print("预测 CPU 使用率...") predictions = predictor.predict_capacity('cpu_usage', days=7) print("\n预测结果:") for prediction in predictions: print(f" {prediction['date']}: {prediction['value']:.2%}") # 检查告警 print("\n检查容量告警...") alerts = predictor.check_capacity_alerts(threshold=0.9) if alerts: print(f"发现 {len(alerts)} 个告警:") for alert in alerts: print(f" {alert['date']}: {alert['value']:.2%} (阈值: {alert['threshold']:.0%})") else: print("未发现容量告警") # 获取建议 print("\n获取容量建议...") recommendations = predictor.get_capacity_recommendations() for rec in recommendations: print(f" {rec['type']}: {rec['message']}") if __name__ == '__main__': run_capacity_prediction() ```